package cn.doitedu.datayi.etl

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import scala.collection.mutable.ListBuffer

/**
 *
 * @author 涛哥
 * @nick_name "deep as the sea"
 * @contact qq:657270652 wx:doit_edu
 * @site www.doitedu.cn
 * @date 2021-09-27     
 * @desc 位置归因策略的归因分析实战
 *       最首事件：权重40%
 *       最末事件：权重40%
 *       中间事件：平摊20%
 *
 *       -- 目标事件： e4
 *       -- 待归因事件： e1  e2  e3
 */
object RuleBasedAttrbution {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .appName("位置归因分析")
      .master("local")
      .enableHiveSupport()
      .getOrCreate()

    // 读取事件明细表（已经挑选好了需求中的待归因事件和目标事件）
    val dtl: DataFrame = spark.read.table("dwd.events")
    //dtl.show(100,false)
    /**
     * +----+-------+---+
     * |guid|eventid|ts |
     * +----+-------+---+
     * |1   |e1     |1  |
     * |1   |e2     |2  |
     * |1   |e3     |3  |
     * |1   |e3     |4  |
     * |1   |e4     |5  |
     * |1   |e2     |7  |
     * |1   |e4     |9  |
     * |3   |e3     |1  |
     * |3   |e4     |2  |
     * |3   |e1     |3  |
     * |3   |e3     |5  |
     * |3   |e4     |6  |
     * |2   |e3     |1  |
     * |2   |e4     |2  |
     * +----+-------+---+
     */
    val rdd = dtl.rdd.map({
      case Row(guid: Long, eventid: String, ts: Long) => (guid, eventid, ts)
    })

    val segemented = rdd.groupBy(tp => tp._1)
      .flatMap(tp => {
        //[(1,e1,1),(1,e2,2),(1,e3,3),(1,e3,4),(1,e4,5),(1,e2,7),(1,e4,9)]
        val eventList: List[(Long, String, Long)] = tp._2.toList.sortBy(_._3) // 内存的问题

        var segmentList = new ListBuffer[List[(Long, String, Long)]]()

        var segment = new ListBuffer[(Long, String, Long)]()

        for (record <- eventList) {
          if (!"e4".equals(record._2)) {
            segment += record
          } else {
            segmentList += segment.toList
            segment = new ListBuffer()
          }
        }

        segmentList.toList
      })
    // segemented.take(100).foreach(println)

    /**
     * List(List((1,e1,1), (1,e2,2), (1,e3,3), (1,e3,4)), List((1,e2,7)))
     * List(List((2,e3,1)))
     * List(List((3,e3,1)), List((3,e1,3), (3,e3,5)))
     */
    /*
       List((1,e1,1), (1,e2,2), (1,e3,3), (1,e3,4))
       List((1,e2,7))
       List((2,e3,1))
       List((3,e3,1))
       List((3,e1,3), (3,e3,5))
    */

    // 对分好段的数据按策略计算归因权重
    val res: RDD[(Long, String, Double)] = segemented.flatMap(lst => {
      if (lst.size == 1) {
        (lst(0)._1, lst(0)._2, 1.0) :: Nil
      } else if (lst.size == 2) {
        (lst(0)._1, lst(0)._2, 0.5) :: (lst(1)._1, lst(1)._2, 0.5) :: Nil
      } else {
        val part1: List[(Long, String, Double)] = (lst(0)._1, lst(0)._2, 0.4) :: (lst(lst.size - 1)._1, lst(lst.size - 1)._2, 0.4) :: Nil
        val middle: List[(Long, String, Long)] = lst.tail.reverse.tail
        val part2: List[(Long, String, Double)] = middle.map(tp => (tp._1, tp._2, 0.2 / middle.size))
        val tmp = part1 ::: part2
        // 对同一个序列中的相同归因事件进行权重聚合
        // List((1,e1,0.4), (1,e2,0.1), (1,e3,0.1), (1,e3,0.4))
        tmp.groupBy(_._2).map(tp=>{
          tp._2.reduce((a,b)=>(a._1,a._2,a._3+b._3))
        })
      }
    })

    import spark.implicits._
    res.toDF("guid","attr_ev","attr_factor")
      .selectExpr("guid"
        ,"'e4' as dest_ev"
        ,"'位置归因' as strategy "
        ,"attr_ev"
        ,"attr_factor"
        ,"'2021-09-26' as window_start"
        ,"'2021-09-26' as window_end")
      .show(100,false)

    /**
     * +----+-------+--------+-------+-----------+------------+----------+
       |guid|dest_ev|strategy|attr_ev|attr_factor|window_start|window_end|
       +----+-------+--------+-------+-----------+------------+----------+
       |1   |e4     |位置归因|e2     |0.1        |2021-09-26  |2021-09-26|
       |1   |e4     |位置归因|e1     |0.4        |2021-09-26  |2021-09-26|
       |1   |e4     |位置归因|e3     |0.5        |2021-09-26  |2021-09-26|
       |1   |e4     |位置归因|e2     |1.0        |2021-09-26  |2021-09-26|
       |2   |e4     |位置归因|e3     |1.0        |2021-09-26  |2021-09-26|
       |3   |e4     |位置归因|e3     |1.0        |2021-09-26  |2021-09-26|
       |3   |e4     |位置归因|e1     |0.5        |2021-09-26  |2021-09-26|
       |3   |e4     |位置归因|e3     |0.5        |2021-09-26  |2021-09-26|
       +----+-------+--------+-------+-----------+------------+----------+
     */

    spark.close()

  }
}
